package custom;

import beans.SensorReading;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.tuple.Tuple2;

/**
 * 自定义求平均值聚合函数
 *
 * @author lvbingbing
 * @date 2022-01-01 13:05
 */
public class MyAverageAggregateFunction implements AggregateFunction<SensorReading, Tuple2<Double, Integer>, Double> {

    private static final long serialVersionUID = 7719180656949389504L;

    @Override
    public Tuple2<Double, Integer> createAccumulator() {
        return new Tuple2<>(0.0, 0);
    }

    @Override
    public Tuple2<Double, Integer> add(SensorReading sensorReading, Tuple2<Double, Integer> accumulator) {
        double sumTemperature = accumulator.f0 + sensorReading.getTemperature();
        int count = accumulator.f1 + 1;
        return new Tuple2<>(sumTemperature, count);
    }

    @Override
    public Double getResult(Tuple2<Double, Integer> accumulator) {
        Double sumTemperature = accumulator.f0;
        Integer count = accumulator.f1;
        return sumTemperature / count;
    }

    @Override
    public Tuple2<Double, Integer> merge(Tuple2<Double, Integer> accumulator1, Tuple2<Double, Integer> accumulator2) {
        double totalSumTemperature = accumulator1.f0 + accumulator2.f0;
        int totalCount = accumulator1.f1 + accumulator2.f1;
        return new Tuple2<>(totalSumTemperature, totalCount);
    }
}
